package com.we.risk.phoneRecord.tools;

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;

import com.alibaba.fastjson.JSONObject;
import com.we.flink.utils.WeKafkaPropertyReader;
import com.we.risk.phoneRecord.stage2.dimuser.DimUserLabelAggregateDe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

public class SendBigJson2Kfk {
    public static final String INPUT_KEY_BY = "uid";
    public static final String OUTPUT_KEY_BY = "user_key";
    public static final String PIDPROV = "pid_prov";
    public static final String PIDCITY = "pid_city";
    public static final String PIDAREA = "pid_area";
    public static final String MOBPROV = "mob_prov";
    public static final String MOBCITY = "mob_city";
    public static final String ADPROV = "ad_prov";
    public static final String ADCITY = "ad_city";
    public static final String ADCOUNTY = "ad_county";

    public static final String SPLITWORD = "#";
    public static final String RELEASEPROP =
            "risk/phonerecord/stage2/dim_user_label_kfk_prod.properties";
    public static final String ADMUSERLABELPROP =
            "risk/phonerecord/stage2/adm_user_label.properties";

    public static Logger LOG = LoggerFactory.getLogger(DimUserLabelAggregateDe.class);

    public static void main(String[] args) throws IOException {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        try {
            WeKafkaPropertyReader paramReader = WeKafkaPropertyReader.init(RELEASEPROP);
            /** RocksDB */
            env.setStateBackend(new RocksDBStateBackend(paramReader.getRocksDBBackendUrl()));
            /** checkpoint configure */
            CheckpointConfig ckConf = env.getCheckpointConfig();
            ckConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            ckConf.setCheckpointInterval(10 * 60 * 1000); // ms
            ckConf.setCheckpointTimeout(60 * 60 * 1000);
            ckConf.setMaxConcurrentCheckpoints(1);
            ckConf.setMinPauseBetweenCheckpoints(500);
            ckConf.enableExternalizedCheckpoints(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

            ckConf.enableUnalignedCheckpoints();

            /** 不拆分高表 宽表输出 */

            /** sink to Kafka */
            String sinkkfkTopic = paramReader.getKfkTopic();
            Properties sinkProp = new Properties();
            sinkProp.setProperty("bootstrap.servers", paramReader.getKfkBootStrapServer());
            int sinkkfkPartitions = paramReader.getKfkPartitions();

            FlinkKafkaProducer<String> userLabelAllFlinkKafkaProducer =
                    new FlinkKafkaProducer<String>(
                            sinkkfkTopic,
                            new WeKafkaKeyedSerializationSchema(),
                            sinkProp,
                            java.util.Optional.of(new WeKafkaCustomPartitioner()));

            //
            // out.addSink(userLabelAllFlinkKafkaProducer).setParallelism(sinkkfkPartitions);

            env.execute(DimUserLabelAggregateDe.class.toString());
        } catch (Exception e) {
            LOG.error("Exception: " + e.getMessage());
            e.printStackTrace();
        }
    }

    private static class WeKafkaKeyedSerializationSchema
            implements KeyedSerializationSchema<String> {
        @Override
        public byte[] serializeKey(String element) {
            JSONObject jsonObject = JSONObject.parseObject(element);
            String keyby = jsonObject.getString(OUTPUT_KEY_BY);
            return keyby.getBytes();
        }

        @Override
        public byte[] serializeValue(String element) {
            return element.getBytes();
        }

        @Override
        public String getTargetTopic(String element) {
            return null;
        }
    }

    private static class WeKafkaCustomPartitioner extends FlinkKafkaPartitioner<String> {

        @Override
        public int partition(
                String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
            int partition = Math.abs(new String(key).hashCode() % partitions.length);
            if (LOG.isDebugEnabled()) {
                LOG.info(
                        " partitions: "
                                + partitions.length
                                + " partition: "
                                + partition
                                + " key: "
                                + new String(key));
            }
            return partition;
        }
    }
}
